热门标签 | HotTags
当前位置:  开发笔记 > 大数据 > 正文

|NO.Z.00100|——|大数据技术|——|Hadoop与KafkaV07|——|KafkaV07源码解析|——|生产者与消费者流程详解V03|

本文深入解析了KafkaV07的源代码,详细阐述了生产者与消费者的流程机制。通过具体示例和代码分析,帮助读者全面理解Kafka的数据传输和处理过程,为实际应用提供理论支持和技术指导。



[BigDataHadoop:Hadoop&kafka.V07]                                                                          [BigDataHadoop.kafka][|章节四|Hadoop生态圈技术栈|kafka|源码剖析|Kafka源码剖析之Producer消费者流程|]





一、自动提交

### --- 自动提交
~~~ 最简单的提交方式是让悄费者自动提交偏移量。
~~~ 如果enable.auto.commit被设为 true,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。
~~~ 提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。
~~~ 与消费者里的其他东西 一样,自动提交也是在轮询(poll() )里进行的。
~~~ 消费者每次在进行轮询时会检查是否该提交偏移量了,
~~~ 如果是,那 么就会提交从上一次轮询返回的偏移量。
~~~ 不过,这种简便的方式也会带来一些问题,

### --- 来看一下下面的例子:
~~~ 假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,
~~~ 再 均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。
~~~ 这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。
~~~ 可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,
~~~ 不过这种情况是无也完全避免的

二、手动提交


### --- 同步提交
~~~ 取消自动提交,把 auto.commit.offset 设为 false,让应用程序决定何时提交 偏 移量。
~~~ 使用commitSync() 提交偏移量最简单也最可靠。
~~~ 这个 API会提交由 poll() 方法返回 的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常

while (true) {
// 消息拉取
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());
}
// 处理完成单次消息以后,提交当前的offset,如果提交失败就抛出异常
consumer.commitSync();
}

### --- 异步提交
~~~ 同步提交有一个不足之处,
~~~ 在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。
~~~ 我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡, 会增加重复消息的数量。
~~~ 这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker的响应。

while (true) {
// 消息拉取
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}

// 异步提交
consumer.commitAsync((offsets, exception) -> {
exception.printStackTrace();
System.out.println(offsets.size());
});
}



===============================END===============================



Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart                                                                                                                                                   ——W.S.Landor



来自为知笔记(Wiz)



推荐阅读
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 本文总结了近年来在实际项目中使用消息中间件的经验和常见问题,旨在为Java初学者和中级开发者提供实用的参考。文章详细介绍了消息中间件在分布式系统中的作用,以及如何通过消息中间件实现高可用性和可扩展性。 ... [详细]
  • 本文介绍了如何在 MapReduce 作业中使用 SequenceFileOutputFormat 生成 SequenceFile 文件,并详细解释了 SequenceFile 的结构和用途。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 基于Web的Kafka管理工具Kafkamanager首次访问Web界面的详细配置指南(附图解)
    首次访问Kafkamanager Web界面时,需要对Kafka集群进行配置。这一过程相对简单,用户只需依次点击【Cluster】>【Add Cluster】,按照提示完成相关设置即可。本文将通过图文并茂的方式,详细介绍每一步的配置步骤,帮助用户快速上手Kafkamanager。 ... [详细]
  • 阿里巴巴终面技术挑战:如何利用 UDP 实现 TCP 功能?
    在阿里巴巴的技术面试中,技术总监曾提出一道关于如何利用 UDP 实现 TCP 功能的问题。当时回答得不够理想,因此事后进行了详细总结。通过与总监的进一步交流,了解到这是一道常见的阿里面试题。面试官的主要目的是考察应聘者对 UDP 和 TCP 在原理上的差异的理解,以及如何通过 UDP 实现类似 TCP 的可靠传输机制。 ... [详细]
  • 在当今的软件开发领域,分布式技术已成为程序员不可或缺的核心技能之一,尤其在面试中更是考察的重点。无论是小微企业还是大型企业,掌握分布式技术对于提升工作效率和解决实际问题都至关重要。本周的Java架构师实战训练营中,我们深入探讨了Kafka这一高效的分布式消息系统,它不仅支持发布订阅模式,还能在高并发场景下保持高性能和高可靠性。通过实际案例和代码演练,学员们对Kafka的应用有了更加深刻的理解。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 基于Dubbo与Zipkin的微服务调用链路监控解决方案
    本文提出了一种基于Dubbo与Zipkin的微服务调用链路监控解决方案。通过抽象配置层,支持HTTP和Kafka两种数据上报方式,实现了灵活且高效的调用链路追踪。该方案不仅提升了系统的可维护性和扩展性,还为故障排查提供了强大的支持。 ... [详细]
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
  • 探究大数据环境下Kafka实现高性能的几个关键因素
    在大数据环境下,Kafka能够实现高性能的关键因素在于其独特的设计和优化策略。尽管Kafka的消息存储在磁盘上,这通常被认为会降低性能,但通过高效的文件管理和批量处理机制,Kafka能够在高吞吐量和低延迟之间取得平衡。此外,Kafka还利用了零拷贝技术、压缩算法和异步IO等手段,进一步提升了系统的整体性能。这些技术不仅保证了数据的可靠性和持久性,还使得Kafka成为处理大规模实时数据流的理想选择。 ... [详细]
  • Kafka核心理论问题汇编【持续更新中】
    本文汇总了Kafka的核心理论问题,涵盖了常见的技术难点和解决方案。内容将持续更新,旨在为开发者提供全面的参考。文章源自博客园,作者呱嗒呱嗒,转载时请注明出处。 ... [详细]
author-avatar
总会有办法的
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有